package com.carol.bigdata.utils

import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}

object RddReader {

    // 读取hive分区文件(orc存储) 返回RDD
    def readHiveRDD(spark: SparkSession,
                    HbaseTable:String,
                    statDay:String,
                    event:String,
                    game:String,
                    columns:List[String],
                    propColumns:List[String] = List(),
                    otherColumns:List[String] = List()): RDD[List[String]] ={
        val tableInfo = HbaseTable.split(":")
        val databases = tableInfo(0)
        val table = tableInfo(1)
        val columnList: List[String] = columns ++ propColumns.map(x => s"properties['$x']") ++ otherColumns
        val columnStr: String = columnList.mkString(",")
        val sql =
            s"""
               |select ${columnStr}
               |from ${databases}.${table}
               |where day=cast('${statDay}' as date) and event='${event}' and game_id=${game}
               |""".stripMargin
        println("sql:")
        println(sql)
        val dataFrame: DataFrame = spark.sql(sql)
        dataFrame.show(5)
        val resultRDD: RDD[List[String]] = dataFrame.rdd.map(row => {
            var list = List[String]()
            for (i <- columnList.indices) {
                val value: Any = row.get(i)
                if (value == null) {
                    list :+= "null"
                }
                else {
                    list :+= value.toString
                }
            }
            list
        })
        println(s"hiveResultRDD: ${resultRDD.count()}")
        resultRDD
    }

    // 读取hdfs的hive分区文件(orc存储) 返回RDD => List[game_id,columns,propColumns] or List[columns,propColumns]
    def readHdfsRDD(spark: SparkSession,
                    pathStr:String,
                    columns:List[String],
                    propColumns:List[String],
                    gameId:String = "null"): RDD[List[String]] ={
        val filePath = new Path(pathStr)
        val fileSystem: FileSystem = filePath.getFileSystem(spark.sparkContext.hadoopConfiguration)
        var rdd: RDD[List[String]] = spark.sparkContext.makeRDD(List[List[String]]())
        if (fileSystem.exists(filePath)) {
            val value: DataFrame = spark.read.orc(pathStr)
              .select("properties",columns: _*)
            value.show(10,false)
            rdd = value.rdd.map(row=>{
                val properties = row.getAs[Map[String,String]]("properties")
                var list = List[String]()
                for (column <- columns){
                    list :+= row.getAs[String](column)
                }
                for (column <- propColumns){
                    list :+= properties.getOrElse(column,"null")
                }
                if (!gameId.equals("null")) list +:= gameId
                list
            })
        }
        rdd
    }
}
